<?php

/*
 * This file is part of SwiftMailer. (c) 2004-2009 Chris Corbyn For the full copyright and license information, please view the LICENSE file that was distributed with this source code.
 */

/**
 * A generic IoBuffer implementation supporting remote sockets and local processes.
 * 
 * @package Swift
 * @subpackage Transport
 * @author Chris Corbyn
 */
class Swift_Transport_StreamBuffer extends Swift_ByteStream_AbstractFilterableInputStream implements Swift_Transport_IoBuffer {
	/**
	 * A primary socket
	 */
	private $_stream;
	
	/**
	 * The input stream
	 */
	private $_in;
	
	/**
	 * The output stream
	 */
	private $_out;
	
	/**
	 * Buffer initialization parameters
	 */
	private $_params = array ();
	
	/**
	 * The ReplacementFilterFactory
	 */
	private $_replacementFactory;
	
	/**
	 * Translations performed on data being streamed into the buffer
	 */
	private $_translations = array ();
	
	/**
	 * Create a new StreamBuffer using $replacementFactory for transformations.
	 * 
	 * @param Swift_ReplacementFilterFactory $replacementFactory        	
	 */
	public function __construct(Swift_ReplacementFilterFactory $replacementFactory) {
		$this->_replacementFactory = $replacementFactory;
	}
	
	/**
	 * Perform any initialization needed, using the given $params.
	 * Parameters will vary depending upon the type of IoBuffer used.
	 * 
	 * @param array $params        	
	 */
	public function initialize(array $params) {
		$this->_params = $params;
		switch ($params ['type']) {
			case self::TYPE_PROCESS :
				$this->_establishProcessConnection ();
				break;
			case self::TYPE_SOCKET :
			default :
				$this->_establishSocketConnection ();
				break;
		}
	}
	
	/**
	 * Set an individual param on the buffer (e.g.
	 * switching to SSL).
	 * 
	 * @param string $param        	
	 * @param mixed $value        	
	 */
	public function setParam($param, $value) {
		if (isset ( $this->_stream )) {
			switch ($param) {
				case 'timeout' :
					if ($this->_stream) {
						stream_set_timeout ( $this->_stream, $value );
					}
					break;
				
				case 'blocking' :
					if ($this->_stream) {
						stream_set_blocking ( $this->_stream, 1 );
					}
			}
		}
		$this->_params [$param] = $value;
	}
	public function startTLS() {
		return stream_socket_enable_crypto ( $this->_stream, true, STREAM_CRYPTO_METHOD_TLS_CLIENT );
	}
	
	/**
	 * Perform any shutdown logic needed.
	 */
	public function terminate() {
		if (isset ( $this->_stream )) {
			switch ($this->_params ['type']) {
				case self::TYPE_PROCESS :
					fclose ( $this->_in );
					fclose ( $this->_out );
					proc_close ( $this->_stream );
					break;
				case self::TYPE_SOCKET :
				default :
					fclose ( $this->_stream );
					break;
			}
		}
		$this->_stream = null;
		$this->_out = null;
		$this->_in = null;
	}
	
	/**
	 * Set an array of string replacements which should be made on data written
	 * to the buffer.
	 * This could replace LF with CRLF for example.
	 * 
	 * @param string[] $replacements        	
	 */
	public function setWriteTranslations(array $replacements) {
		foreach ( $this->_translations as $search => $replace ) {
			if (! isset ( $replacements [$search] )) {
				$this->removeFilter ( $search );
				unset ( $this->_translations [$search] );
			}
		}
		
		foreach ( $replacements as $search => $replace ) {
			if (! isset ( $this->_translations [$search] )) {
				$this->addFilter ( $this->_replacementFactory->createFilter ( $search, $replace ), $search );
				$this->_translations [$search] = true;
			}
		}
	}
	
	/**
	 * Get a line of output (including any CRLF).
	 * The $sequence number comes from any writes and may or may not be used
	 * depending upon the implementation.
	 * 
	 * @param int $sequence
	 *        	of last write to scan from
	 * @return string
	 */
	public function readLine($sequence) {
		if (isset ( $this->_out ) && ! feof ( $this->_out )) {
			$line = fgets ( $this->_out );
			if (strlen ( $line ) == 0) {
				$metas = stream_get_meta_data ( $this->_out );
				if ($metas ['timed_out']) {
					throw new Swift_IoException ( 'Connection to ' . $this->_getReadConnectionDescription () . ' Timed Out' );
				}
			}
			
			return $line;
		}
	}
	
	/**
	 * Reads $length bytes from the stream into a string and moves the pointer
	 * through the stream by $length.
	 * If less bytes exist than are requested the
	 * remaining bytes are given instead. If no bytes are remaining at all, boolean
	 * false is returned.
	 * 
	 * @param int $length        	
	 * @return string
	 */
	public function read($length) {
		if (isset ( $this->_out ) && ! feof ( $this->_out )) {
			$ret = fread ( $this->_out, $length );
			if (strlen ( $ret ) == 0) {
				$metas = stream_get_meta_data ( $this->_out );
				if ($metas ['timed_out']) {
					throw new Swift_IoException ( 'Connection to ' . $this->_getReadConnectionDescription () . ' Timed Out' );
				}
			}
			
			return $ret;
		}
	}
	
	/**
	 * Not implemented
	 */
	public function setReadPointer($byteOffset) {
	}
	
	// -- Protected methods
	
	/**
	 * Flush the stream contents
	 */
	protected function _flush() {
		if (isset ( $this->_in )) {
			fflush ( $this->_in );
		}
	}
	
	/**
	 * Write this bytes to the stream
	 */
	protected function _commit($bytes) {
		if (isset ( $this->_in ) && fwrite ( $this->_in, $bytes )) {
			return ++ $this->_sequence;
		}
	}
	
	// -- Private methods
	
	/**
	 * Establishes a connection to a remote server.
	 * 
	 * @access private
	 */
	private function _establishSocketConnection() {
		$host = $this->_params ['host'];
		if (! empty ( $this->_params ['protocol'] )) {
			$host = $this->_params ['protocol'] . '://' . $host;
		}
		$timeout = 15;
		if (! empty ( $this->_params ['timeout'] )) {
			$timeout = $this->_params ['timeout'];
		}
		$options = array ();
		if (! empty ( $this->_params ['sourceIp'] )) {
			$options ['socket'] ['bindto'] = $this->_params ['sourceIp'] . ':0';
		}
		$this->_stream = @stream_socket_client ( $host . ':' . $this->_params ['port'], $errno, $errstr, $timeout, STREAM_CLIENT_CONNECT, stream_context_create ( $options ) );
		if (false === $this->_stream) {
			throw new Swift_TransportException ( 'Connection could not be established with host ' . $this->_params ['host'] . ' [' . $errstr . ' #' . $errno . ']' );
		}
		if (! empty ( $this->_params ['blocking'] )) {
			stream_set_blocking ( $this->_stream, 1 );
		} else {
			stream_set_blocking ( $this->_stream, 0 );
		}
		stream_set_timeout ( $this->_stream, $timeout );
		$this->_in = & $this->_stream;
		$this->_out = & $this->_stream;
	}
	
	/**
	 * Opens a process for input/output.
	 * 
	 * @access private
	 */
	private function _establishProcessConnection() {
		$command = $this->_params ['command'];
		$descriptorSpec = array (
				0 => array (
						'pipe',
						'r' 
				),
				1 => array (
						'pipe',
						'w' 
				),
				2 => array (
						'pipe',
						'w' 
				) 
		);
		$this->_stream = proc_open ( $command, $descriptorSpec, $pipes );
		stream_set_blocking ( $pipes [2], 0 );
		if ($err = stream_get_contents ( $pipes [2] )) {
			throw new Swift_TransportException ( 'Process could not be started [' . $err . ']' );
		}
		$this->_in = & $pipes [0];
		$this->_out = & $pipes [1];
	}
	private function _getReadConnectionDescription() {
		switch ($this->_params ['type']) {
			case self::TYPE_PROCESS :
				return 'Process ' . $this->_params ['command'];
				break;
			
			case self::TYPE_SOCKET :
			default :
				$host = $this->_params ['host'];
				if (! empty ( $this->_params ['protocol'] )) {
					$host = $this->_params ['protocol'] . '://' . $host;
				}
				$host .= ':' . $this->_params ['port'];
				
				return $host;
				break;
		}
	}
}
